Skip to content

fix: handle premature stream termination for Anthropic (#1868)#2047

Merged
gautamsirdeshmukh merged 1 commit intostrands-agents:mainfrom
gautamsirdeshmukh:main
Apr 8, 2026
Merged

fix: handle premature stream termination for Anthropic (#1868)#2047
gautamsirdeshmukh merged 1 commit intostrands-agents:mainfrom
gautamsirdeshmukh:main

Conversation

@gautamsirdeshmukh
Copy link
Copy Markdown
Contributor

Problem

The Anthropic provider's stream method tries to read event.message.usage from the last iterated stream event to extract token usage metadata. However, if the stream terminates prematurely and the last stream event's .message attribute has not yet been populated with usage, this line crashes with an AttributeError.

Solution

Instead of checking event.message.usage for the last stream event, we now call Anthropic SDK's stream.get_final_message() method, which returns a "message snapshot" accumulated from all received events rather than relying on the last event's state. This call is wrapped in a try/except/else block, so that if it fails (which is only possible when zero events were received) we log a warning instead of crashing.

Possible Concerns

One may worry that merely logging a warning when get_final_message() call fails could lead to undercounted token usage. However, this method only fails when the stream yields zero events - in which case, there is no usage data to report anyway. If one or more events were received, the Anthropic SDK guarantees that the snapshot contains usage data (initialized by the mandatory message_start event), and get_final_message() will succeed.

Related Issues

#1868

Documentation PR

N/A

Type of Change

Bug fix

Testing

How have you tested the change? Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare (added a few tests to cover premature termination + empty stream cases)
  • I also performed a demo of 6 scenarios to compare behavior of old vs new code (for Sonnet and Opus)
======================================================================
  #1868 Fix: Before vs After — claude-sonnet-4-6
======================================================================

  ──────────────────────────────────────────────────────────────────
  Scenario 1: Normal completion
  Full stream with message_stop. Both paths should succeed.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ Last event type: message_stop
    ✅ event.message.usage → {'input_tokens': 100, 'output_tokens': 50}

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=50, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 50}

  ──────────────────────────────────────────────────────────────────
  Scenario 2: Premature termination (TextEvent)
  Stream dies after TextEvent. TextEvent has no .message attribute.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'text']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: text
    ❌ event.message.usage → CRASH: AttributeError: 'TextEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=25, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 25}

  ──────────────────────────────────────────────────────────────────
  Scenario 3: Premature termination (ContentBlockDeltaEvent)
  Stream dies after content_block_delta. No .message on this event type.
  Stream events: ['message_start', 'content_block_delta']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_delta
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockDeltaEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=10, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 10}

  ──────────────────────────────────────────────────────────────────
  Scenario 4: Premature termination (ContentBlockStartEvent)
  Stream dies right after content_block_start. No content delivered.
  Stream events: ['message_start', 'content_block_start']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_start
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStartEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=200, output_tokens=0, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 200, 'output_tokens': 0}

  ──────────────────────────────────────────────────────────────────
  Scenario 5: Content delivered, dies before message_stop
  Full content block delivered but stream dies before message_stop.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_stop
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStopEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-sonnet-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=150, output_tokens=40, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 150, 'output_tokens': 40}

  ──────────────────────────────────────────────────────────────────
  Scenario 6: Empty stream (zero events)
  Immediate connection failure. No events received at all.
  Stream events: []
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: (none)
    ❌ event.message.usage → CRASH: UnboundLocalError: cannot access local variable 'event' where it is not associated with a value

  NEW (stream.get_final_message() snapshot):
    ⚠️  Events: []
    ⚠️  get_final_message() → raised exception (caught)
    ⚠️  snapshot.usage → not available (warning logged)

======================================================================
  #1868 Fix: Before vs After — claude-opus-4-6
======================================================================

  ──────────────────────────────────────────────────────────────────
  Scenario 1: Normal completion
  Full stream with message_stop. Both paths should succeed.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ Last event type: message_stop
    ✅ event.message.usage → {'input_tokens': 100, 'output_tokens': 50}

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop', 'message_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=50, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 50}

  ──────────────────────────────────────────────────────────────────
  Scenario 2: Premature termination (TextEvent)
  Stream dies after TextEvent. TextEvent has no .message attribute.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'text']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: text
    ❌ event.message.usage → CRASH: AttributeError: 'TextEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=25, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 25}

  ──────────────────────────────────────────────────────────────────
  Scenario 3: Premature termination (ContentBlockDeltaEvent)
  Stream dies after content_block_delta. No .message on this event type.
  Stream events: ['message_start', 'content_block_delta']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_delta
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockDeltaEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_delta']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=100, output_tokens=10, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 100, 'output_tokens': 10}

  ──────────────────────────────────────────────────────────────────
  Scenario 4: Premature termination (ContentBlockStartEvent)
  Stream dies right after content_block_start. No content delivered.
  Stream events: ['message_start', 'content_block_start']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_start
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStartEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=200, output_tokens=0, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 200, 'output_tokens': 0}

  ──────────────────────────────────────────────────────────────────
  Scenario 5: Content delivered, dies before message_stop
  Full content block delivered but stream dies before message_stop.
  Stream events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: content_block_stop
    ❌ event.message.usage → CRASH: AttributeError: 'RawContentBlockStopEvent' object has no attribute 'message'

  NEW (stream.get_final_message() snapshot):
    ✅ Events: ['message_start', 'content_block_start', 'content_block_delta', 'content_block_stop']
    ✅ get_final_message() returned snapshot:
       id=msg_test, model=claude-opus-4-6
       stop_reason=None, content=[]
       usage=Usage(cache_creation=None, cache_creation_input_tokens=0, cache_read_input_tokens=0, inference_geo=None, input_tokens=150, output_tokens=40, server_tool_use=None, service_tier=None)
    ✅ Extracted: {'input_tokens': 150, 'output_tokens': 40}

  ──────────────────────────────────────────────────────────────────
  Scenario 6: Empty stream (zero events)
  Immediate connection failure. No events received at all.
  Stream events: []
  ──────────────────────────────────────────────────────────────────

  OLD (event.message.usage on last iterated event):
    ❌ Last event type: (none)
    ❌ event.message.usage → CRASH: UnboundLocalError: cannot access local variable 'event' where it is not associated with a value

  NEW (stream.get_final_message() snapshot):
    ⚠️  Events: []
    ⚠️  get_final_message() → raised exception (caught)
    ⚠️  snapshot.usage → not available (warning logged)

======================================================================
  Done.
======================================================================

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@codecov
Copy link
Copy Markdown

codecov bot commented Apr 3, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@gautamsirdeshmukh
Copy link
Copy Markdown
Contributor Author

gautamsirdeshmukh commented Apr 3, 2026

Looking into why 1-3 integ tests continue to time out intermittently (this change to the Anthropic stream method has zero connection to the failed cases for multi-agent executions, concurrency for tests may need to be optimized).

Edit: Looks like #2044 is seeing the same test failure. Certainly unrelated to either change, as expected.

@mkmeral
Copy link
Copy Markdown
Contributor

mkmeral commented Apr 6, 2026

/strands review

@gautamsirdeshmukh
Copy link
Copy Markdown
Contributor Author

/strands are there any issues with this change?

@JackYPCOnline
Copy link
Copy Markdown
Contributor

/strands review

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 6, 2026

Assessment: Request Changes

Good fix for a real crash scenario. The approach of using stream.get_final_message() is sound and well-motivated. Two issues need to be addressed before merging:

Review Details
  • Exception handling: The except Exception catch is too broad (as noted by @Unshure). The Anthropic SDK raises AssertionError specifically from get_final_message(). Catching all exceptions masks real bugs — including the one described below.
  • Broken test coverage: test_structured_output (line 892) still uses agenerator (a plain async generator without get_final_message()). It currently only passes because the broad except Exception silently swallows the AttributeError. Narrowing the catch (which is the right thing to do) will expose this as a test failure. This test needs the same mock stream pattern used in the new tests.
  • Test duplication: The mock stream setup is repeated across 3 tests. Consider extracting a shared fixture (as @Unshure suggested), which would also naturally fix the test_structured_output issue.

The core fix is solid and the thorough scenario testing in the PR description is appreciated.

@gautamsirdeshmukh
Copy link
Copy Markdown
Contributor Author

/strands review

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 6, 2026

Assessment: Approve

All previous review feedback has been addressed cleanly. The exception catch is now scoped to AssertionError (matching the Anthropic SDK's actual raise), the generate_mock_stream helper eliminates test duplication, and test_structured_output has been updated to use the new pattern.

Review Summary
  • Production code: The stream.get_final_message() approach with try/except AssertionError/else is correct and resilient. The logging format follows project conventions.
  • Test coverage: Three scenarios covered — normal completion, early termination, and empty stream. The test_structured_output test is also properly updated.
  • Helper design: generate_mock_stream() handles both success (return_value) and failure (side_effect) cases via the isinstance(final_message, Exception) check, which is clean.
  • Documentation: N/A appropriate — this is a bug fix with no user-facing API changes.
  • API review: Not required — no public API surface changes.

Solid fix with thorough testing.

@gautamsirdeshmukh
Copy link
Copy Markdown
Contributor Author

/strands review

@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 6, 2026

Assessment: Approve

All prior review feedback has been addressed. No new issues found.

Review Details
  • Exception handling: except AssertionError correctly matches the Anthropic SDK's get_final_message() failure mode (assert self.__final_message_snapshot is not None). Since until_done() is a no-op after the stream is fully iterated, no other exceptions are expected from this call path.
  • Test helper: generate_mock_stream_context() cleanly encapsulates both the mock stream (with __aiter__ and get_final_message) and the async context manager, eliminating the prior duplication. All 4 call sites pass final_message explicitly.
  • Test coverage: Normal completion, early termination, and empty stream scenarios are covered. test_structured_output is properly updated. All agenerator references removed from affected tests.
  • Conventions: Logging follows AGENTS.md structured format. No public API changes, so no docs PR or API review needed.

@gautamsirdeshmukh gautamsirdeshmukh merged commit 2f9ffb1 into strands-agents:main Apr 8, 2026
21 checks passed
@github-actions
Copy link
Copy Markdown

github-actions bot commented Apr 8, 2026

Assessment: Approve

This PR has been merged. Confirming the final state looks good — no new issues found.

Review Details
  • Production code: stream.get_final_message() with except AssertionError correctly targets the Anthropic SDK's specific failure mode. Logging follows AGENTS.md conventions.
  • Test helper: generate_mock_stream_context() cleanly encapsulates mock stream + async context setup. All 4 call sites pass final_message explicitly.
  • Test coverage: Normal completion, early termination, empty stream, and structured output scenarios all covered. All agenerator references removed. Codecov confirms full coverage.
  • No API review needed: Internal bug fix with no public API surface changes.
  • No docs PR needed: Bug fix with no user-facing behavior changes to document.

mkmeral added a commit to mkmeral/sdk-python that referenced this pull request Apr 8, 2026
The test_stream_message_stop_no_pydantic_warnings test was using a raw
async generator as the stream mock, which lacks get_final_message().
This caused an AttributeError on Python 3.14 after PR strands-agents#2047 introduced
stream.get_final_message() in the Anthropic provider.

Switch to generate_mock_stream_context helper which properly mocks the
stream object with get_final_message support.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants